/**
 * Copyright (c) 2023 murenchao
 * taomu is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.mqtt.broker.impl

import cool.taomu.mqtt.broker.entity.ClientSessionEntity
import cool.taomu.mqtt.broker.entity.MessageEntity
import cool.taomu.mqtt.broker.entity.PublishEntity
import cool.taomu.mqtt.broker.entity.TopicEntity
import cool.taomu.mqtt.broker.utils.MqttUtils
import cool.taomu.mqtt.broker.utils.impl.DataStorage
import cool.taomu.utils.inter.IObservable
import cool.taomu.utils.inter.IObserver
import cool.taomu.utils.inter.IStorage
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.mqtt.MqttFixedHeader
import io.netty.handler.codec.mqtt.MqttMessageType
import io.netty.handler.codec.mqtt.MqttPublishMessage
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader
import io.netty.handler.codec.mqtt.MqttQoS
import java.util.concurrent.atomic.AtomicInteger
import org.apache.commons.lang3.SerializationUtils
import org.apache.oro.text.perl.Perl5Util
import org.eclipse.xtend.lib.annotations.Accessors
import org.slf4j.LoggerFactory

@Accessors
class Retain implements IObserver {
	val LOG = LoggerFactory.getLogger(Retain);

	IStorage cache = new DataStorage();

	MessageEntity msg;

	static AtomicInteger count = new AtomicInteger(0);
	int number = 0;

	new(MessageEntity msg) {
		this.msg = msg;
	}

	new() {
		count.incrementAndGet();
		this.number = count.intValue;
	}

	private def void publishMessage(TopicEntity topic) {
		try {
			var p5 = new Perl5Util();
			var subTopicName = topic.name.replace("/+", "/[a-zA-Z]?[a-zA-Z0-9]+").replace("/#",
				"/[a-zA-Z]?([a-zA-Z0-9/]*)").replace("/", "\\/");
			if (p5.match("/" + subTopicName + "/", msg.topic)) {
				LOG.info("发送者id : {},  Topic : {}", msg.senderId, msg.topic);
				// 推送匹配上的消息
				var minQos = MqttUtils.getQos(msg.qos, topic.qos);
				if (minQos == 2) {
					// 记录QoS2信息
					var cloneMsg = SerializationUtils.clone(msg);
					cloneMsg.senderChannel = msg.senderChannel;
					cache.put("mqtt-qos2-message",topic.clientId, cloneMsg);
				}
				LOG.info("cache is null:{} client id:{}", cache === null, topic.clientId);
				var clientSession = cache.get("mqtt-session",topic.clientId) as ClientSessionEntity;
				LOG.info("订阅者id : {},  Topic : {}, 发送内容长度： {}", topic.clientId, msg.payload.length);
				var entity = new PublishEntity(MqttQoS.valueOf(minQos), topic.name,
					clientSession.generateMessageId as Integer, msg.payload, false);
				clientSession.ctx.writeAndFlush(response(entity));
			}
		} catch (Exception ex) {
			LOG.debug("publishMessage 方法出现错误 : ", ex);
		}
	}

	private def response(PublishEntity entity) {
		var header = new MqttFixedHeader(MqttMessageType.PUBLISH, entity.dup, entity.qos, false, 0);
		var varHeader = new MqttPublishVariableHeader(entity.topicName, entity.messageId);
		var ByteBuf heapBuf;
		if (entity.payload === null) {
			heapBuf = Unpooled.EMPTY_BUFFER;
		} else {
			try {
				heapBuf = Unpooled.wrappedBuffer(entity.payload);
			} catch (IllegalArgumentException e) {
				e.printStackTrace;
			}
		}
		return new MqttPublishMessage(header, varHeader, heapBuf);
	}

	override publish(IObservable<?> o, Object arg) {
		if (arg instanceof TopicEntity) {
			this.publishMessage(arg);
		}
	}

}
